查看原文
其他

StreamNative 和中国移动宣布开源 AoP: Apache Pulsar 支持原生 AMQP 协议

李鹏辉、翟佳等 StreamNative 2021-04-22

我们很高兴地宣布 StreamNative 和中国移动开源了 “AoP”(AMQP on Pulsar)。AoP 将 AMQP 协议处理插件引入 Pulsar broker。这样一来,Apache Pulsar 就支持原生 AMQP 协议。


与 KoP(https://hub.streamnative.io/protocol-handlers/kop/0.2.0)相似,AoP 是一种可插拔的协议处理插件。


将 AoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 RabbitMQ 应用程序和服务迁移到 Pulsar。


这样,RabbitMQ 应用程序就可以使用 Pulsar 的强大功能,例如利用 Apache BookKeeper 保存事件流和 Pulsar 分层存储特性。



 什么是 Apache Pulsar?

Apache Pulsar 是一个流数据平台。最初,Apache Pulsar 就采用云原生、分层分片的架构。该架构将服务和存储分离开来,使系统实现更友好的容器化。


Pulsar 的云原生架构具备强扩展性、高一致性和高弹性,使公司能通过实时数据解决方案扩展业务。自 2016 年开源以来,Pulsar 已得到广泛采用,并于 2018 年成为 Apache 顶级项目。


对 AoP 的渴望

Plusar 为队列和流工作负载提供统一的消息模型。Pulsar 支持自己基于 protobuf 的二进制协议,以确保高性能和低延迟。protobuf 有利于实现 Pulsar 客户端。


而且,该项目也支持 Java,Go,Python 和 C ++ 语言以及社区提供的第三方客户端。
https://pulsar.apache.org/docs/en/client-libraries/#thirdparty-clients


但是,对于使用其他消息传输协议编写的应用程序,用户必须重写这些应用程序,否则这些应用程序无法采用 Pulsar 新的统一消息传输协议。


为了解决这一问题,Pulsar 社区开发了一些应用程序,以便将 AMQP 应用程序从其他消息系统迁移到 Pulsar。例如,Pulsar 提供了 RabbitMQ Source connector RabbitMQ Sink connector,允许用户在 Pulsar 和 RabbitMQ 之间传输数据。但是,那些想要从其他 AMQP 应用程序切换到 Pulsar 的用户仍然有强烈的需求。


StreamNative 和中国移动的合作

StreamNative 收到大量的用户请求,请求帮助从其他消息系统迁移到 Pulsar 。同时,StreamNative 也意识到在 Pulsar 上原生支持其他消息传输协议(例如 AMQP 和 Kafka)的必要性。


所以,StreamNative 开始致力于将通用协议处理插件框架引入到 Pulsar 中。该框架允许使用其他消息传输协议的开发人员使用 Pulsar。


中国移动是 OpenStack Foundation 的金牌会员,拥有全球最多的 OpenStack 集群部署实践。 RabbitMQ 是 OpenStack 消息中间件的缺省集成。在部署和维护 RabbitMQ 的过程中,中国移动遭遇了巨大挑战。


在 OpenStack 系统中,作为 RPC 通信组件,RabbitMQ 处理大量流入和流出的消息。在操作过程中,经常会积压消息。这将导致内存异常,从而进一步导致进程因为内存异常而卡住。


另一方面,RabbitMQ 的镜像队列用于确保数据的高可用性。当节点进入异常状态时,整个群集将无法正常使用。此外,RabbitMQ 的编程语言 erlang 晦涩难懂且难以解决。综上所述,考虑到 RabbitMQ 集群的不稳定性,以及操作、维护和故障排除的难度,中国移动计划开发一种可以替代 RabbitMQ 的中间件产品。


同时,中国移动的公有云中有很多需要使用 AMQP 消息队列的客户,但是现有的 RabbitMQ 不满足云访问的条件,因此,中国移动的中间件团队开始研究 AMQP 消息队列。在对比 Qpid,RocketMQ 和 Pulsar 之后,中国移动被 Pulsar 的计算存储分离架构所吸引。


在对 Pulsar 进行了一段时间的调查之后,ChinaMobile 发现 StreamNative 已经开源了 KoP,这使得中国移动确定基于 Pulsar 开发 AMQP 具有可行性。中国移动开始与 StreamNative 共同开发 AoP 协议处理插件。




实现方式



AoP 框架概览

AoP 是一个可插拔的协议处理插件,可以通过使用 Pulsar 的 topics, cursors 等特性在 Pulsar 上支持原生 AMQP 协议。


下图展示了AoP 协议处理插件与 Pulsar 集群的结合。AMQP Proxy 服务和 AMQP 协议处理插件都与 Pulsar broker 一起运行。目前,AoP 是基于 AMQP 0.9.1 协议进行开发,我们也在考虑增加对 AMQP 1.0 协议的支持。



 AoP 基础概念

AMQP 0.9.1 引入了一些基础概念,例如 Exchagne, Queue 和 Router。这些与 Pulsar 的模型有着较大的区别。所以我们需要找到一种方法,支持利用 Pulsar 现有的一些特征,并将它们联系在一起。下图展示了消息在 AoP 中的流转,并讨论了关于消息持久化,消息路由,消息投递的细节。



  1. 当 Producer 发送消息到 AmqpExchange,AmqpExchange 将消息持久化到 Pulsar Topic (我们称之为存储原始消息的 Topic)。
  2. AmqpExchange 的 replicator 会将消息传递给 Router。
  3. Router 判断是否需要将消息路由给 AmqpQueue。如果是,会将原始消息的 ID 存入AmqpQueue 的 Topic 中 (我们称之为存储索引消息的 Topic)。
  4. AmqpQueue 将消息传递给 consumer。


📒 AmqpExchange

AmqpExchange 包含一个原始消息 Topic,用来保存 AMQP producer 发送的消息。AmqpExchange 的 replicator 会将消息处理到 AMQP 队列中。Replicator 是基于 Pulsar 的持久化游标,可以确保成功将消息发送到队列,而不会丢失消息。

📒 AmqpMessageRouter

AmqpMessageRouter 用于维护消息路由类型以及将消息从 AmqpExchange 路由到 AmqpQueue 的路由规则。路由类型和路由规则这些原数据都持久化在 Pulsar 的存储中。所以就算 broker 重启,我们也可以恢复 AmqpMessageRouter。

📒 AmqpQueue

AmqpQueue 提供一个索引消息 Topic,用来存储路由到这个队列的  IndexMessage。IndexMessage 由原始消息的 ID 和存储消息的 Exchange 的名称组成。当 AmqpQueue 向 consumer 发送消息时,AmqpQueue 会根据 IndexMessage 读取原始消息数据,然后将其发送给 consumer。

📒 Vhost 分配

在 AoP 中,一个 AMQP Vhost 只能由一个 Pulsar broker 提供服务,而一个 Pulsar broker 可以为多个 Vhost 提供服务。所以增加 Vhost 和 broker 的数量可以达到横向扩容的效果。通过使用更多的 Vhost 可以使用户构建更大的 AoP 集群。

在 AoP 中,一个 Vhost 基于一个 Pulsar namespace,并且这个 namespace 只能有一个 bundle。如果一台 broker 崩溃,其他的 broker 可以接管这台崩溃的 broker 维护的 Vhost。Vhost 也可以利用 broker 的负载均衡机制。broker 可以将 Vhost 从一台高负载的机器转移到一台空闲的机器。下图展示了 Vhost 在 broker 上的分配情况。


📒 Proxy

AoP Proxy 用于在客户端与 AMQP 服务连接时,帮助查找负责处理 Vhost 数据的 owner broker,并在客户端与 Vhost 的 owner broker 之间传输数据。

如上一节所述,Vhost 由集群中的一个 broker 提供服务,这可以通过 Pulsar 的 Topic 发现机制来实现。这也是为什么一个 Vhost 只能由带有单个 bundle 的 namespace 支持。如果一个 namespace 有多个 bundle,用户很难通过 Vhost 名称来确定 owner broker。

下图说明了 AoP Proxy 的服务流程。

  1. AMQP 客户端建立与 AoP Proxy 的连接。

  2. AoP Proxy 向 Pulsar cluster 发送查找请求,以便确定 Vhost 的 owner broker 的 URL 地址。

  3. Pulsar 集群 将 owner broker 的 URL 地址返回给 AoP Proxy。

  4. AoP Proxy 建立与 Vhose 的 owner broker 的连接并开始在 AMQP 客户端和 Vhost 的 owner broker 之间传输数据。



目前,AoP Proxy 与 Pulsar broker 共同工作。用户可以通过配置 amqpProxyEnable 来选择是否开启 AoP Proxy 服务。
https://github.com/streamnative/aop/wiki



🙋‍♂️ 试用 AoP

AoP 使用 Apache License V2 许可证,项目地址为:
https://github.com/streamnative/aop

StreamNative Platform 1.1 版本也即将发布,该版本内置 AoP。你可以选择下载 StreamNative Platform 来试用 AoP 的所有功能。
http://streamnative.io/download

你也可以在 StreamNative Hub官网下载  AoP:
https://github.com/streamnative/aop/releases/download/v0.1.0/pulsar-protocol-handler-amqp-0.1.0-SNAPSHOT.nar

如果已经运行 Pulsar 集群,并且希望其支持 AMQP 协议,可以将 AoP 协议处理插件安装到现有的 Pulsar 集群。关于如何安装和使用 AoP 的详细信息,请参考 AoP 文档。
https://hub.streamnative.io/protocol-handlers/aop/0.1.0/


🎙️ 致 谢

最初,StreamNative 发起 AoP 项目。后来中国移动的研发团队加入了该项目。我们一起合作开发 AoP 项目。非常感谢中国移动的胡宗棠、王少杰张浩对这个项目的贡献!

想要随时掌握 Pulsar 的研发进展、用户案例和热点话题吗?快来关注 Apache Pulsar 和 StreamNative 微信公众号,我们第一时间在这里分享与 Pulsar 有关的一切。

👇🏻点击「阅读原文」查看项目地址

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存